package day03.sink;

import beans.SensorReading;
import day02.source.MySensor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

/**
 * Flink 流处理 API - sink
 * <p>
 * 将数据输出到 MySQL 中
 *
 * @author lvbingbing
 * @date 2021-12-28 19:33
 */
public class FlinkSink03 {
    public static void main(String[] args) throws Exception {
        // 1、创建可执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 2、从自定义数据源读取数据
        DataStream<SensorReading> dataStream = env.addSource(new MySensor());
        // 3、学习 将数据输出到 MySQL 中
        studyWriteToMySql(dataStream);
        // 4、触发程序执行
        env.execute();
    }

    /**
     * 实现自定义的SinkFunction
     *
     * @param sensorReadingStream <br>
     */
    private static void studyWriteToMySql(DataStream<SensorReading> sensorReadingStream) {
        sensorReadingStream.addSink(new MyJdbcSink());
    }

    private static class MyJdbcSink extends RichSinkFunction<SensorReading> {

        private static final long serialVersionUID = 6776061796351477436L;

        private transient Connection connection = null;

        private transient PreparedStatement insertPreparedStatement = null;

        private transient PreparedStatement updatePreparedStatement = null;

        @Override
        public void open(Configuration parameters) throws Exception {
            connection = DriverManager.getConnection("jdbc:mysql://hadoop104:3306/test", "atguigu", "139559");
            insertPreparedStatement = connection.prepareStatement("insert into sensor_temp (id, temp) values(?, ?)");
            updatePreparedStatement = connection.prepareStatement("update sensor_temp set temp = ? where id = ?");
        }

        /**
         * 每来一条数据，执行sql
         *
         * @param sensorReading 要更新或者生成的数据
         * @param context       上下文
         */
        @Override
        public void invoke(SensorReading sensorReading, Context context) throws Exception {
            // 直接执行更新语句，如果更新成功0条，那么就新增
            updatePreparedStatement.setDouble(1, sensorReading.getTemperature());
            updatePreparedStatement.setString(2, sensorReading.getId());
            updatePreparedStatement.execute();
            int updateCount = updatePreparedStatement.getUpdateCount();
            if (updateCount == 0) {
                insertPreparedStatement.setString(1, sensorReading.getId());
                insertPreparedStatement.setDouble(2, sensorReading.getTemperature());
                insertPreparedStatement.execute();
            }
        }

        @Override
        public void close() throws Exception {
            if (insertPreparedStatement != null) {
                insertPreparedStatement.close();
            }
            if (updatePreparedStatement != null) {
                updatePreparedStatement.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }
}
